import re

from prefect import task
import pandas as pd
from prefect import get_run_logger

from application.db.mysql_db.info import ResourceInformationList
from prefect import Task, Flow

from application.tasks.base_task import BaseTask


class DeduplicationTask(BaseTask):
    def run(self, df: pd.DataFrame):
        all_ids = set(ResourceInformationList.select(ResourceInformationList.information_id))  # 获取所有id
        df['is_excluded'] = df['_id'].apply(self.deduplication_id, args=(all_ids,))  # 判断id是否在数据库中
        df = df[~df['is_excluded']]  # 删除重复的id
        return df

    @staticmethod
    def deduplication_id(information_id, all_ids):
        return information_id in all_ids
